#include "netlib/rpc/rpc_server.h"

#include "google/protobuf/service.h"
#include "netlib/base/logger.h"
#include "rpc.pb.h"

namespace netlib::rpc {

RpcServer::RpcServer(net::EventLoop* loop,
                     const net::InetAddress& listen_addr,
                     bool is_epoll,
                     std::string&& name,
                     int n_event_thread) :
    server_(loop, listen_addr, is_epoll, std::move(name), n_event_thread),
    codec_([this](const net::TcpConnectionPtr& conn,
                  std::unique_ptr<ProtobufMessage>&& msg,
                  Timestamp ts) { OnRpcMessage(conn, std::move(msg), ts); }) {
	server_.SetMessageCallback([this](const net::TcpConnectionPtr& conn, net::Buffer* buf,
	                                  Timestamp ts) { OnMessage(conn, buf, ts); });
}

void RpcServer::Register(::google::protobuf::Service* service) {
	auto desc = service->GetDescriptor();
	services_.emplace(desc->full_name(), service);
}

void RpcServer::OnMessage(const net::TcpConnectionPtr& conn, net::Buffer* buf, Timestamp ts) {
	LOG_DEBUG << "Server receive message, then parse it by codec";
	codec_.OnMessage(conn, buf, ts);
}

void RpcServer::OnRpcMessage(const net::TcpConnectionPtr& conn,
                             std::unique_ptr<ProtobufMessage>&& msg,
                             Timestamp ts) {
	LOG_DEBUG << "Receive RpcMessage: " << msg->DebugString();
	auto rmsg = dynamic_cast<RpcMessage*>(msg.get());
	assert(rmsg->type() == kRequest);
	auto it = services_.find(rmsg->service());
	if (it == services_.end()) {
		ErrorCode err = kNoService;
		assert(0);
	}
	auto service = it->second;
	auto method = service->GetDescriptor()->FindMethodByName(rmsg->method());
	assert(method);
	auto request = std::unique_ptr<ProtobufMessage>(service->GetRequestPrototype(method).New());
	auto res = request->ParseFromString(rmsg->request());
	assert(res);
	auto response = service->GetResponsePrototype(method).New();
	auto done = new RpcClosure(conn, response, rmsg->id(), codec_);
	service->CallMethod(method, nullptr, request.get(), response, done);
}
void RpcClosure::Run() {
	RpcMessage rmsg;
	rmsg.set_type(kResponse);
	rmsg.set_id(id_);
	rmsg.set_response(response_->SerializeAsString());
	codec_.Send(conn_, rmsg);
	delete this;
}
} // namespace netlib::rpc
